---
title: "AutoGen"
description: "Integrate AgentOps with Microsoft AutoGen for multi-agent workflow tracking"
---

[AutoGen](https://microsoft.github.io/autogen/stable/) is Microsoft's framework for building multi-agent conversational AI systems. AgentOps provides seamless integration with AutoGen to track and monitor your multi-agent workflows.

## Installation

<CodeGroup>
```bash pip
pip install agentops autogen-core python-dotenv
```
```bash poetry
poetry add agentops autogen-core python-dotenv
```
```bash uv
uv pip install agentops autogen-core python-dotenv
```
</CodeGroup>

## Setting Up API Keys

Before using AutoGen with AgentOps, you need to set up your API keys. You can obtain:
- **OPENAI_API_KEY**: From the [OpenAI Platform](https://platform.openai.com/api-keys)
- **AGENTOPS_API_KEY**: From your [AgentOps Dashboard](https://app.agentops.ai/)

Then to set them up, you can either export them as environment variables or set them in a `.env` file.

<CodeGroup>
    ```bash Export to CLI
    export OPENAI_API_KEY="your_openai_api_key_here"
    export AGENTOPS_API_KEY="your_agentops_api_key_here"
    ```
    ```txt Set in .env file
    OPENAI_API_KEY="your_openai_api_key_here"
    AGENTOPS_API_KEY="your_agentops_api_key_here"
    ```
</CodeGroup>

Then load the environment variables in your Python code:

```python
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Set up environment variables with fallback values
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY")
```

## Usage

AgentOps automatically instruments AutoGen agents and tracks their interactions. Simply initialize AgentOps before creating your AutoGen agents!

<CodeGroup>
```python Countdown
import asyncio
from dataclasses import dataclass
from typing import Callable
import agentops

from autogen_core import (
    DefaultTopicId, 
    MessageContext, 
    RoutedAgent, 
    default_subscription, 
    message_handler,
    AgentId,
    SingleThreadedAgentRuntime
)

# Initialize AgentOps
agentops.init()

@dataclass
class CountdownMessage:
    """Message containing a number for countdown operations"""
    content: int

@default_subscription
class ModifierAgent(RoutedAgent):
    """Agent that modifies numbers by applying a transformation function"""
    
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent that transforms numbers.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
        """Handle incoming messages and apply modification"""
        original_val = message.content
        modified_val = self._modify_val(original_val)
        
        print(f"🔧 ModifierAgent: Transformed {original_val} → {modified_val}")
        
        # Publish the modified value to continue the workflow
        await self.publish_message(
            CountdownMessage(content=modified_val), 
            DefaultTopicId()
        )

@default_subscription  
class CheckerAgent(RoutedAgent):
    """Agent that checks if a condition is met and decides whether to continue"""
    
    def __init__(self, stop_condition: Callable[[int], bool]) -> None:
        super().__init__("A checker agent that validates conditions.")
        self._stop_condition = stop_condition

    @message_handler
    async def handle_message(self, message: CountdownMessage, ctx: MessageContext) -> None:
        """Handle incoming messages and check stopping condition"""
        value = message.content
        
        if not self._stop_condition(value):
            print(f"✅ CheckerAgent: {value} passed validation, continuing workflow")
            # Continue the workflow by publishing the message
            await self.publish_message(
                CountdownMessage(content=value), 
                DefaultTopicId()
            )
        else:
            print(f"🛑 CheckerAgent: {value} failed validation, stopping workflow")
            print("🎉 Countdown completed successfully!")

async def run_countdown_workflow():
    """Run a countdown workflow from 10 to 1 using AutoGen agents"""
    
    print("🚀 Starting AutoGen Countdown Workflow")
    print("=" * 50)
    
    # Create the AutoGen runtime
    runtime = SingleThreadedAgentRuntime()
    
    # Register the modifier agent (subtracts 1 from each number)
    await ModifierAgent.register(
        runtime,
        "modifier",
        lambda: ModifierAgent(modify_val=lambda x: x - 1),
    )
    
    # Register the checker agent (stops when value <= 1)
    await CheckerAgent.register(
        runtime,
        "checker", 
        lambda: CheckerAgent(stop_condition=lambda x: x <= 1),
    )
    
    # Start the runtime
    runtime.start()
    print("🤖 AutoGen runtime started")
    print("📨 Sending initial message with value: 10")
    
    # Send initial message to start the countdown
    await runtime.send_message(
        CountdownMessage(10), 
        AgentId("checker", "default")
    )
    
    # Wait for the workflow to complete
    await runtime.stop_when_idle()
    
    print("=" * 50)
    print("✨ Workflow completed! Check your AgentOps dashboard for detailed traces.")

# Run the workflow
if __name__ == "__main__":
    asyncio.run(run_countdown_workflow())
```

```python Multi-Agent
import asyncio
from dataclasses import dataclass
from typing import List, Dict, Any
import agentops

from autogen_core import (
    DefaultTopicId,
    MessageContext, 
    RoutedAgent,
    default_subscription,
    message_handler,
    AgentId,
    SingleThreadedAgentRuntime
)

# Initialize AgentOps
agentops.init()

@dataclass
class DataMessage:
    """Message containing data to be processed"""
    data: List[Dict[str, Any]]
    stage: str
    metadata: Dict[str, Any]

@default_subscription
class DataCollectorAgent(RoutedAgent):
    """Agent responsible for collecting and preparing initial data"""
    
    def __init__(self) -> None:
        super().__init__("Data collector agent that gathers initial dataset.")

    @message_handler
    async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
        print(f"📊 DataCollector: Collecting data for {message.metadata.get('source', 'unknown')}")
        
        # Simulate data collection
        collected_data = [
            {"id": 1, "value": 100, "category": "A"},
            {"id": 2, "value": 200, "category": "B"}, 
            {"id": 3, "value": 150, "category": "A"},
            {"id": 4, "value": 300, "category": "C"},
        ]
        
        print(f"✅ DataCollector: Collected {len(collected_data)} records")
        
        # Send to processor
        await self.publish_message(
            DataMessage(
                data=collected_data,
                stage="processing",
                metadata={**message.metadata, "collected_count": len(collected_data)}
            ),
            DefaultTopicId()
        )

@default_subscription
class DataProcessorAgent(RoutedAgent):
    """Agent that processes and transforms data"""
    
    def __init__(self) -> None:
        super().__init__("Data processor agent that transforms collected data.")

    @message_handler  
    async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
        if message.stage != "processing":
            return
            
        print(f"⚙️ DataProcessor: Processing {len(message.data)} records")
        
        # Process data - add calculated fields
        processed_data = []
        for item in message.data:
            processed_item = {
                **item,
                "processed_value": item["value"] * 1.1,  # 10% increase
                "status": "processed"
            }
            processed_data.append(processed_item)
        
        print(f"✅ DataProcessor: Processed {len(processed_data)} records")
        
        # Send to analyzer
        await self.publish_message(
            DataMessage(
                data=processed_data,
                stage="analysis", 
                metadata={**message.metadata, "processed_count": len(processed_data)}
            ),
            DefaultTopicId()
        )

@default_subscription
class DataAnalyzerAgent(RoutedAgent):
    """Agent that analyzes processed data and generates insights"""
    
    def __init__(self) -> None:
        super().__init__("Data analyzer agent that generates insights.")

    @message_handler
    async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
        if message.stage != "analysis":
            return
            
        print(f"🧠 DataAnalyzer: Analyzing {len(message.data)} records")
        
        # Perform analysis
        total_value = sum(item["processed_value"] for item in message.data)
        avg_value = total_value / len(message.data)
        categories = set(item["category"] for item in message.data)
        
        analysis_results = {
            "total_records": len(message.data),
            "total_value": total_value,
            "average_value": avg_value,
            "unique_categories": len(categories),
            "categories": list(categories)
        }
        
        print(f"📈 DataAnalyzer: Analysis complete")
        print(f"   • Total records: {analysis_results['total_records']}")
        print(f"   • Average value: {analysis_results['average_value']:.2f}")
        print(f"   • Categories: {', '.join(analysis_results['categories'])}")
        
        # Send to reporter
        await self.publish_message(
            DataMessage(
                data=message.data,
                stage="reporting",
                metadata={
                    **message.metadata, 
                    "analysis": analysis_results
                }
            ),
            DefaultTopicId()
        )

@default_subscription
class ReportGeneratorAgent(RoutedAgent):
    """Agent that generates final reports"""
    
    def __init__(self) -> None:
        super().__init__("Report generator agent that creates final output.")

    @message_handler
    async def handle_message(self, message: DataMessage, ctx: MessageContext) -> None:
        if message.stage != "reporting":
            return
            
        print(f"📝 ReportGenerator: Generating final report")
        
        analysis = message.metadata.get("analysis", {})
        
        report = f"""
🎯 DATA PROCESSING REPORT
========================
Source: {message.metadata.get('source', 'Unknown')}
Processing Date: {message.metadata.get('timestamp', 'Unknown')}

📊 SUMMARY STATISTICS:
• Total Records Processed: {analysis.get('total_records', 0)}
• Total Value: ${analysis.get('total_value', 0):,.2f}
• Average Value: ${analysis.get('average_value', 0):,.2f}
• Unique Categories: {analysis.get('unique_categories', 0)}
• Categories Found: {', '.join(analysis.get('categories', []))}

✅ Processing pipeline completed successfully!
        """
        
        print(report)
        print("🎉 Multi-agent data processing workflow completed!")

async def run_data_processing_pipeline():
    """Run a complete data processing pipeline using multiple AutoGen agents"""
    
    print("🚀 Starting AutoGen Data Processing Pipeline")
    print("=" * 60)
    
    # Create runtime
    runtime = SingleThreadedAgentRuntime()
    
    # Register all agents
    await DataCollectorAgent.register(
        runtime,
        "collector",
        lambda: DataCollectorAgent(),
    )
    
    await DataProcessorAgent.register(
        runtime,
        "processor", 
        lambda: DataProcessorAgent(),
    )
    
    await DataAnalyzerAgent.register(
        runtime,
        "analyzer",
        lambda: DataAnalyzerAgent(),
    )
    
    await ReportGeneratorAgent.register(
        runtime,
        "reporter",
        lambda: ReportGeneratorAgent(),
    )
    
    # Start runtime
    runtime.start()
    print("🤖 AutoGen runtime with 4 agents started")
    
    # Trigger the pipeline
    initial_message = DataMessage(
        data=[],
        stage="collection",
        metadata={
            "source": "customer_database",
            "timestamp": "2024-01-15T10:30:00Z",
            "pipeline_id": "data_proc_001"
        }
    )
    
    print("📨 Triggering data processing pipeline...")
    await runtime.send_message(
        initial_message,
        AgentId("collector", "default")
    )
    
    # Wait for completion
    await runtime.stop_when_idle()
    
    print("=" * 60)
    print("✨ Pipeline completed! Check AgentOps dashboard for detailed agent traces.")

# Run the pipeline
if __name__ == "__main__":
    asyncio.run(run_data_processing_pipeline())
```
</CodeGroup>

## Examples

<CardGroup cols={2}>
  <Card title="Agent Chat Example" icon="notebook" href="/v2/examples/autogen">
    Basic multi-agent chat functionality
  </Card>
  <Card title="Math Agent Example" icon="notebook" href="https://github.com/AgentOps-AI/agentops/blob/main/examples/autogen/MathAgent.ipynb" newTab={true}>
    Demonstrates an agent specialized for mathematical problem-solving.
  </Card>
</CardGroup>

Visit your [AgentOps Dashboard](https://app.agentops.ai) to see detailed traces of your AutoGen agent interactions, performance metrics, and workflow analytics.


<script type="module" src="/scripts/github_stars.js"></script>
<script type="module" src="/scripts/scroll-img-fadein-animation.js"></script>
<script type="module" src="/scripts/button_heartbeat_animation.js"></script>
<script type="module" src="/scripts/adjust_api_dynamically.js"></script>
